1 /**
2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module my.actor.actor;
7 
8 import std.stdio : writeln, writefln;
9 
10 import core.thread : Thread;
11 import logger = std.experimental.logger;
12 import std.algorithm : schwartzSort, max, min, among;
13 import std.array : empty;
14 import std.datetime : SysTime, Clock, dur;
15 import std.exception : collectException;
16 import std.functional : toDelegate;
17 import std.meta : staticMap;
18 import std.traits : Parameters, Unqual, ReturnType, isFunctionPointer, isFunction;
19 import std.typecons : Tuple, tuple;
20 import std.variant : Variant;
21 
22 import my.actor.common : ExitReason, SystemError, makeSignature;
23 import my.actor.mailbox;
24 import my.actor.msg;
25 import my.actor.system : System;
26 import my.actor.typed : isTypedAddress, isTypedActorImpl;
27 import my.gc.refc;
28 import sumtype;
29 
30 private struct PromiseData {
31     WeakAddress replyTo;
32     ulong replyId;
33 
34     /// Copy constructor
35     this(ref return scope typeof(this) rhs) @safe nothrow @nogc {
36         replyTo = rhs.replyTo;
37         replyId = rhs.replyId;
38     }
39 
40     @disable this(this);
41 }
42 
43 // deliver can only be called one time.
44 struct Promise(T) {
45     package {
46         RefCounted!PromiseData data;
47     }
48 
49     void deliver(T reply) {
50         auto tmp = reply;
51         deliver(reply);
52     }
53 
54     /** Deliver the message `reply`.
55      *
56      * A promise can only be delivered once.
57      */
58     void deliver(ref T reply) @trusted
59     in (!data.empty, "promise must be initialized") {
60         if (data.empty)
61             return;
62         scope (exit)
63             data.release;
64 
65         // TODO: should probably call delivering actor with an ErrorMsg if replyTo is closed.
66         if (auto replyTo = data.get.replyTo.lock.get) {
67             enum wrapInTuple = !is(T : Tuple!U, U);
68             static if (wrapInTuple)
69                 replyTo.put(Reply(data.get.replyId, Variant(tuple(reply))));
70             else
71                 replyTo.put(Reply(data.get.replyId, Variant(reply)));
72         }
73     }
74 
75     void opAssign(Promise!T rhs) {
76         data = rhs.data;
77     }
78 
79     /// True if the promise is not initialized.
80     bool empty() {
81         return data.empty || data.get.replyId == 0;
82     }
83 
84     /// Clear the promise.
85     void clear() {
86         data.release;
87     }
88 }
89 
90 auto makePromise(T)() {
91     return Promise!T(refCounted(PromiseData.init));
92 }
93 
94 struct RequestResult(T) {
95     this(T v) {
96         value = typeof(value)(v);
97     }
98 
99     this(ErrorMsg v) {
100         value = typeof(value)(v);
101     }
102 
103     this(Promise!T v) {
104         value = typeof(value)(v);
105     }
106 
107     SumType!(T, ErrorMsg, Promise!T) value;
108 }
109 
110 private alias MsgHandler = void delegate(void* ctx, ref Variant msg) @safe;
111 private alias RequestHandler = void delegate(void* ctx, ref Variant msg,
112         ulong replyId, WeakAddress replyTo) @safe;
113 private alias ReplyHandler = void delegate(void* ctx, ref Variant msg) @safe;
114 
115 alias DefaultHandler = void delegate(ref Actor self, ref Variant msg) @safe nothrow;
116 
117 /** Actors send error messages to others by returning an error (see Errors)
118  * from a message handler. Similar to exit messages, error messages usually
119  * cause the receiving actor to terminate, unless a custom handler was
120  * installed. The default handler is used as fallback if request is used
121  * without error handler.
122  */
123 alias ErrorHandler = void delegate(ref Actor self, ErrorMsg) @safe nothrow;
124 
125 /** Bidirectional monitoring with a strong lifetime coupling is established by
126  * calling a `LinkRequest` to an address. This will cause the runtime to send
127  * an `ExitMsg` if either this or other dies. Per default, actors terminate
128  * after receiving an `ExitMsg` unless the exit reason is exit_reason::normal.
129  * This mechanism propagates failure states in an actor system. Linked actors
130  * form a sub system in which an error causes all actors to fail collectively.
131  */
132 alias ExitHandler = void delegate(ref Actor self, ExitMsg msg) @safe nothrow;
133 
134 /// An exception has been thrown while processing a message.
135 alias ExceptionHandler = void delegate(ref Actor self, Exception e) @safe nothrow;
136 
137 /** Actors can monitor the lifetime of other actors by sending a `MonitorRequest`
138  * to an address. This will cause the runtime system to send a `DownMsg` for
139  * other if it dies.
140  *
141  * Actors drop down messages unless they provide a custom handler.
142  */
143 alias DownHandler = void delegate(ref Actor self, DownMsg msg) @safe nothrow;
144 
145 void defaultHandler(ref Actor self, ref Variant msg) @safe nothrow {
146 }
147 
148 /// Write the name of the actor and the message type to the console.
149 void logAndDropHandler(ref Actor self, ref Variant msg) @trusted nothrow {
150     import std.stdio : writeln;
151 
152     try {
153         writeln("UNKNOWN message sent to actor ", self.name);
154         writeln(msg.toString);
155     } catch (Exception e) {
156     }
157 }
158 
159 void defaultErrorHandler(ref Actor self, ErrorMsg msg) @safe nothrow {
160     self.lastError = msg.reason;
161     self.shutdown;
162 }
163 
164 void defaultExitHandler(ref Actor self, ExitMsg msg) @safe nothrow {
165     self.lastError = msg.reason;
166     self.forceShutdown;
167 }
168 
169 void defaultExceptionHandler(ref Actor self, Exception e) @safe nothrow {
170     self.lastError = SystemError.runtimeError;
171     // TODO: should log?
172     self.forceShutdown;
173 }
174 
175 // Write the name of the actor and the exception to stdout.
176 void logExceptionHandler(ref Actor self, Exception e) @safe nothrow {
177     import std.stdio : writeln;
178 
179     self.lastError = SystemError.runtimeError;
180 
181     try {
182         writeln("EXCEPTION thrown by actor ", self.name);
183         writeln(e.msg);
184         writeln("TERMINATING");
185     } catch (Exception e) {
186     }
187 
188     self.forceShutdown;
189 }
190 
191 /// Timeout for an outstanding request.
192 struct ReplyHandlerTimeout {
193     ulong id;
194     SysTime timeout;
195 }
196 
197 package enum ActorState {
198     /// waiting to be started.
199     waiting,
200     /// active and processing messages.
201     active,
202     /// wait for all awaited responses to finish
203     shutdown,
204     /// discard also the awaite responses, just shutdown fast
205     forceShutdown,
206     /// in process of shutting down
207     finishShutdown,
208     /// stopped.
209     stopped,
210 }
211 
212 private struct AwaitReponse {
213     Closure!(ReplyHandler, void*) behavior;
214     ErrorHandler onError;
215 }
216 
217 struct Actor {
218     import std.container.rbtree : RedBlackTree, redBlackTree;
219 
220     package StrongAddress addr;
221     // visible in the package for logging purpose.
222     package ActorState state_ = ActorState.stopped;
223 
224     private {
225         // TODO: rename to behavior.
226         Closure!(MsgHandler, void*)[ulong] incoming;
227         Closure!(RequestHandler, void*)[ulong] reqBehavior;
228 
229         // callbacks for awaited responses key:ed on their id.
230         AwaitReponse[ulong] awaitedResponses;
231         ReplyHandlerTimeout[] replyTimeouts;
232 
233         // important that it start at 1 because then zero is known to not be initialized.
234         ulong nextReplyId = 1;
235 
236         /// Delayed messages ordered by their trigger time.
237         RedBlackTree!(DelayedMsg*, "a.triggerAt < b.triggerAt", true) delayed;
238 
239         /// Used during shutdown to signal monitors and links why this actor is terminating.
240         SystemError lastError;
241 
242         /// monitoring the actor lifetime.
243         WeakAddress[size_t] monitors;
244 
245         /// strong, bidirectional link of the actors lifetime.
246         WeakAddress[size_t] links;
247 
248         // Number of messages that has been processed.
249         ulong messages_;
250 
251         /// System the actor belongs to.
252         System* homeSystem_;
253 
254         string name_;
255 
256         ErrorHandler errorHandler_;
257 
258         /// callback when a link goes down.
259         DownHandler downHandler_;
260 
261         ExitHandler exitHandler_;
262 
263         ExceptionHandler exceptionHandler_;
264 
265         DefaultHandler defaultHandler_;
266     }
267 
268     invariant () {
269         if (addr && !state_.among(ActorState.waiting, ActorState.shutdown)) {
270             assert(errorHandler_);
271             assert(exitHandler_);
272             assert(exceptionHandler_);
273             assert(defaultHandler_);
274         }
275     }
276 
277     this(StrongAddress a) @trusted
278     in (!a.empty, "address is empty") {
279         state_ = ActorState.waiting;
280 
281         addr = a;
282         addr.get.setOpen;
283         delayed = new typeof(delayed);
284 
285         errorHandler_ = toDelegate(&defaultErrorHandler);
286         downHandler_ = null;
287         exitHandler_ = toDelegate(&defaultExitHandler);
288         exceptionHandler_ = toDelegate(&defaultExceptionHandler);
289         defaultHandler_ = toDelegate(&.defaultHandler);
290     }
291 
292     WeakAddress address() @safe {
293         return addr.weakRef;
294     }
295 
296     package ref StrongAddress addressRef() return @safe pure nothrow @nogc {
297         return addr;
298     }
299 
300     ref System homeSystem() @safe pure nothrow @nogc {
301         return *homeSystem_;
302     }
303 
304     /** Clean shutdown of the actor
305      *
306      * Stopping incoming messages from triggering new behavior and finish all
307      * awaited respones.
308      */
309     void shutdown() @safe nothrow {
310         if (state_.among(ActorState.waiting, ActorState.active))
311             state_ = ActorState.shutdown;
312     }
313 
314     /** Force an immediate shutdown.
315      *
316      * Stopping incoming messages from triggering new behavior and finish all
317      * awaited respones.
318      */
319     void forceShutdown() @safe nothrow {
320         if (state_.among(ActorState.waiting, ActorState.active, ActorState.shutdown))
321             state_ = ActorState.forceShutdown;
322     }
323 
324     ulong id() @safe pure nothrow const @nogc {
325         return addr.id;
326     }
327 
328     /// Returns: the name of the actor.
329     string name() @safe pure nothrow const @nogc {
330         return name_;
331     }
332 
333     // dfmt off
334 
335     /// Set name name of the actor.
336     void name(string n) @safe pure nothrow @nogc {
337         this.name_ = n;
338     }
339 
340     void errorHandler(ErrorHandler v) @safe pure nothrow @nogc {
341         errorHandler_ = v;
342     }
343 
344     void downHandler(DownHandler v) @safe pure nothrow @nogc {
345         downHandler_ = v;
346     }
347 
348     void exitHandler(ExitHandler v) @safe pure nothrow @nogc {
349         exitHandler_ = v;
350     }
351 
352     void exceptionHandler(ExceptionHandler v) @safe pure nothrow @nogc {
353         exceptionHandler_ = v;
354     }
355 
356     void defaultHandler(DefaultHandler v) @safe pure nothrow @nogc {
357         defaultHandler_ = v;
358     }
359 
360     // dfmt on
361 
362 package:
363     bool hasMessage() @safe pure nothrow @nogc {
364         return addr && addr.get.hasMessage;
365     }
366 
367     /// How long until a delayed message or a timeout fires.
368     Duration nextTimeout(const SysTime now, const Duration default_) @safe {
369         return min(delayed.empty ? default_ : (delayed.front.triggerAt - now),
370                 replyTimeouts.empty ? default_ : (replyTimeouts[0].timeout - now));
371     }
372 
373     bool waitingForReply() @safe pure nothrow const @nogc {
374         return !awaitedResponses.empty;
375     }
376 
377     /// Number of messages that has been processed.
378     ulong messages() @safe pure nothrow const @nogc {
379         return messages_;
380     }
381 
382     void setHomeSystem(System* sys) @safe pure nothrow @nogc {
383         homeSystem_ = sys;
384     }
385 
386     void cleanupBehavior() @trusted nothrow {
387         foreach (ref a; incoming.byValue) {
388             try {
389                 a.free;
390             } catch (Exception e) {
391                 // TODO: call exceptionHandler?
392             }
393         }
394         incoming = null;
395         foreach (ref a; reqBehavior.byValue) {
396             try {
397                 a.free;
398             } catch (Exception e) {
399             }
400         }
401         reqBehavior = null;
402     }
403 
404     void cleanupAwait() @trusted nothrow {
405         foreach (ref a; awaitedResponses.byValue) {
406             try {
407                 a.behavior.free;
408             } catch (Exception e) {
409             }
410         }
411         awaitedResponses = null;
412     }
413 
414     void cleanupDelayed() @trusted nothrow {
415         foreach (const _; 0 .. delayed.length) {
416             try {
417                 delayed.front.msg = Msg.init;
418                 delayed.removeFront;
419             } catch (Exception e) {
420             }
421         }
422         .destroy(delayed);
423     }
424 
425     bool isAlive() @safe pure nothrow const @nogc {
426         final switch (state_) {
427         case ActorState.waiting:
428             goto case;
429         case ActorState.active:
430             goto case;
431         case ActorState.shutdown:
432             goto case;
433         case ActorState.forceShutdown:
434             goto case;
435         case ActorState.finishShutdown:
436             return true;
437         case ActorState.stopped:
438             return false;
439         }
440     }
441 
442     /// Accepting messages.
443     bool isAccepting() @safe pure nothrow const @nogc {
444         final switch (state_) {
445         case ActorState.waiting:
446             goto case;
447         case ActorState.active:
448             goto case;
449         case ActorState.shutdown:
450             return true;
451         case ActorState.forceShutdown:
452             goto case;
453         case ActorState.finishShutdown:
454             goto case;
455         case ActorState.stopped:
456             return false;
457         }
458     }
459 
460     ulong replyId() @safe {
461         return nextReplyId++;
462     }
463 
464     void process(const SysTime now) @safe nothrow {
465         import core.memory : GC;
466 
467         assert(!GC.inFinalizer);
468 
469         messages_ = 0;
470 
471         void tick() {
472             // philosophy of the order is that a timeout should only trigger if it
473             // is really required thus it is checked last.  This order then mean
474             // that a request may have triggered a timeout but because
475             // `processReply` is called before `checkReplyTimeout` it is *ignored*.
476             // Thus "better to accept even if it is timeout rather than fail".
477             try {
478                 processSystemMsg();
479                 processDelayed(now);
480                 processIncoming();
481                 processReply();
482                 checkReplyTimeout(now);
483             } catch (Exception e) {
484                 exceptionHandler_(this, e);
485             }
486         }
487 
488         assert(state_ == ActorState.stopped || addr, "no address");
489 
490         final switch (state_) {
491         case ActorState.waiting:
492             state_ = ActorState.active;
493             tick;
494             // the state can be changed before the actor have executed.
495             break;
496         case ActorState.active:
497             tick;
498             // self terminate if the actor has no behavior.
499             if (incoming.empty && awaitedResponses.empty && reqBehavior.empty)
500                 state_ = ActorState.forceShutdown;
501             break;
502         case ActorState.shutdown:
503             tick;
504             if (awaitedResponses.empty)
505                 state_ = ActorState.finishShutdown;
506             cleanupBehavior;
507             break;
508         case ActorState.forceShutdown:
509             state_ = ActorState.finishShutdown;
510             cleanupBehavior;
511             addr.get.setClosed;
512             break;
513         case ActorState.finishShutdown:
514             state_ = ActorState.stopped;
515 
516             sendToMonitors(DownMsg(addr.weakRef, lastError));
517 
518             sendToLinks(ExitMsg(addr.weakRef, lastError));
519 
520             replyTimeouts = null;
521             cleanupDelayed;
522             cleanupAwait;
523 
524             // must be last because sendToLinks and sendToMonitors uses addr.
525             addr.get.shutdown();
526             addr.release;
527             break;
528         case ActorState.stopped:
529             break;
530         }
531     }
532 
533     void sendToMonitors(DownMsg msg) @safe nothrow {
534         foreach (ref a; monitors.byValue) {
535             try {
536                 if (auto rc = a.lock.get)
537                     rc.put(SystemMsg(msg));
538                 a.release;
539             } catch (Exception e) {
540             }
541         }
542 
543         monitors = null;
544     }
545 
546     void sendToLinks(ExitMsg msg) @safe nothrow {
547         foreach (ref a; links.byValue) {
548             try {
549                 if (auto rc = a.lock.get)
550                     rc.put(SystemMsg(msg));
551                 a.release;
552             } catch (Exception e) {
553             }
554         }
555 
556         links = null;
557     }
558 
559     void checkReplyTimeout(const SysTime now) @safe {
560         if (replyTimeouts.empty)
561             return;
562 
563         size_t removeTo;
564         foreach (const i; 0 .. replyTimeouts.length) {
565             if (now > replyTimeouts[i].timeout) {
566                 const id = replyTimeouts[i].id;
567                 if (auto v = id in awaitedResponses) {
568                     messages_++;
569                     v.onError(this, ErrorMsg(addr.weakRef, SystemError.requestTimeout));
570                     try {
571                         () @trusted { v.behavior.free; }();
572                     } catch (Exception e) {
573                     }
574                     awaitedResponses.remove(id);
575                 }
576                 removeTo = i + 1;
577             } else {
578                 break;
579             }
580         }
581 
582         if (removeTo >= replyTimeouts.length) {
583             replyTimeouts = null;
584         } else if (removeTo != 0) {
585             replyTimeouts = replyTimeouts[removeTo .. $];
586         }
587     }
588 
589     void processIncoming() @safe {
590         if (addr.get.empty!Msg)
591             return;
592         messages_++;
593 
594         auto front = addr.get.pop!Msg;
595         scope (exit)
596             .destroy(front);
597 
598         void doSend(ref MsgOneShot msg) {
599             if (auto v = front.get.signature in incoming) {
600                 (*v)(msg.data);
601             } else {
602                 defaultHandler_(this, msg.data);
603             }
604         }
605 
606         void doRequest(ref MsgRequest msg) @trusted {
607             if (auto v = front.get.signature in reqBehavior) {
608                 (*v)(msg.data, msg.replyId, msg.replyTo);
609             } else {
610                 defaultHandler_(this, msg.data);
611             }
612         }
613 
614         front.get.type.match!((ref MsgOneShot a) { doSend(a); }, (ref MsgRequest a) {
615             doRequest(a);
616         });
617     }
618 
619     /** All system messages are handled.
620      *
621      * Assuming:
622      *  * they are not heavy to process
623      *  * very important that if there are any they should be handled as soon as possible
624      *  * ignoring the case when there is a "storm" of system messages which
625      *    "could" overload the actor system and lead to a crash. I classify this,
626      *    for now, as intentional, malicious coding by the developer themself.
627      *    External inputs that could trigger such a behavior should be controlled
628      *    and limited. Other types of input such as a developer trying to break
629      *    the actor system is out of scope.
630      */
631     void processSystemMsg() @safe {
632         //() @trusted {
633         //logger.infof("run %X", cast(void*) &this);
634         //}();
635         while (!addr.get.empty!SystemMsg) {
636             messages_++;
637             //logger.infof("%X %s %s", addr.toHash, state_, messages_);
638             auto front = addr.get.pop!SystemMsg;
639             scope (exit)
640                 .destroy(front);
641 
642             front.get.match!((ref DownMsg a) {
643                 if (downHandler_)
644                     downHandler_(this, a);
645             }, (ref MonitorRequest a) { monitors[a.addr.toHash] = a.addr; }, (ref DemonitorRequest a) {
646                 if (auto v = a.addr.toHash in monitors)
647                     v.release;
648                 monitors.remove(a.addr.toHash);
649             }, (ref LinkRequest a) { links[a.addr.toHash] = a.addr; }, (ref UnlinkRequest a) {
650                 if (auto v = a.addr.toHash in links)
651                     v.release;
652                 links.remove(a.addr.toHash);
653             }, (ref ErrorMsg a) { errorHandler_(this, a); }, (ref ExitMsg a) {
654                 exitHandler_(this, a);
655             }, (ref SystemExitMsg a) {
656                 final switch (a.reason) {
657                 case ExitReason.normal:
658                     break;
659                 case ExitReason.unhandledException:
660                     exitHandler_(this, ExitMsg.init);
661                     break;
662                 case ExitReason.unknown:
663                     exitHandler_(this, ExitMsg.init);
664                     break;
665                 case ExitReason.userShutdown:
666                     exitHandler_(this, ExitMsg.init);
667                     break;
668                 case ExitReason.kill:
669                     exitHandler_(this, ExitMsg.init);
670                     // the user do NOT have an option here
671                     forceShutdown;
672                     break;
673                 }
674             });
675         }
676     }
677 
678     void processReply() @safe {
679         if (addr.get.empty!Reply)
680             return;
681         messages_++;
682 
683         auto front = addr.get.pop!Reply;
684         scope (exit)
685             .destroy(front);
686 
687         if (auto v = front.get.id in awaitedResponses) {
688             // TODO: reduce the lookups on front.id
689             v.behavior(front.get.data);
690             try {
691                 () @trusted { v.behavior.free; }();
692             } catch (Exception e) {
693             }
694             awaitedResponses.remove(front.get.id);
695             removeReplyTimeout(front.get.id);
696         } else {
697             // TODO: should probably be SystemError.unexpectedResponse?
698             defaultHandler_(this, front.get.data);
699         }
700     }
701 
702     void processDelayed(const SysTime now) @trusted {
703         if (!addr.get.empty!DelayedMsg) {
704             // count as a message because handling them are "expensive".
705             // Ignoring the case that the message right away is moved to the
706             // incoming queue. This lead to "double accounting" but ohh well.
707             // Don't use delayedSend when you should have used send.
708             messages_++;
709             delayed.insert(addr.get.pop!DelayedMsg.unsafeMove);
710         } else if (delayed.empty) {
711             return;
712         }
713 
714         foreach (const i; 0 .. delayed.length) {
715             if (now > delayed.front.triggerAt) {
716                 addr.get.put(delayed.front.msg);
717                 delayed.removeFront;
718             } else {
719                 break;
720             }
721         }
722     }
723 
724     private void removeReplyTimeout(ulong id) @safe nothrow {
725         import std.algorithm : remove;
726 
727         foreach (const i; 0 .. replyTimeouts.length) {
728             if (replyTimeouts[i].id == id) {
729                 remove(replyTimeouts, i);
730                 break;
731             }
732         }
733     }
734 
735     void register(ulong signature, Closure!(MsgHandler, void*) handler) @trusted {
736         if (!isAccepting)
737             return;
738 
739         if (auto v = signature in incoming) {
740             try {
741                 v.free;
742             } catch (Exception e) {
743             }
744         }
745         incoming[signature] = handler;
746     }
747 
748     void register(ulong signature, Closure!(RequestHandler, void*) handler) @trusted {
749         if (!isAccepting)
750             return;
751 
752         if (auto v = signature in reqBehavior) {
753             try {
754                 v.free;
755             } catch (Exception e) {
756             }
757         }
758         reqBehavior[signature] = handler;
759     }
760 
761     void register(ulong replyId, SysTime timeout, Closure!(ReplyHandler,
762             void*) reply, ErrorHandler onError) @safe {
763         if (!isAccepting)
764             return;
765 
766         awaitedResponses[replyId] = AwaitReponse(reply, onError is null ? errorHandler_ : onError);
767         replyTimeouts ~= ReplyHandlerTimeout(replyId, timeout);
768         schwartzSort!(a => a.timeout, (a, b) => a < b)(replyTimeouts);
769     }
770 }
771 
772 struct Closure(Fn, CtxT) {
773     alias FreeFn = void function(CtxT);
774 
775     Fn fn;
776     CtxT ctx;
777     FreeFn cleanup;
778 
779     this(Fn fn) {
780         this.fn = fn;
781     }
782 
783     this(Fn fn, CtxT* ctx, FreeFn cleanup) {
784         this.fn = fn;
785         this.ctx = ctx;
786         this.cleanup = cleanup;
787     }
788 
789     void opCall(Args...)(auto ref Args args) {
790         assert(fn !is null);
791         fn(ctx, args);
792     }
793 
794     void free() {
795         // will crash, on purpuse, if there is a ctx and no cleanup registered.
796         // maybe a bad idea? dunno... lets see
797         if (ctx)
798             cleanup(ctx);
799         ctx = CtxT.init;
800     }
801 }
802 
803 @("shall register a behavior to be called when msg received matching signature")
804 unittest {
805     auto addr = makeAddress2;
806     auto actor = Actor(addr);
807 
808     bool processedIncoming;
809     void fn(void* ctx, ref Variant msg) {
810         processedIncoming = true;
811     }
812 
813     actor.register(1, Closure!(MsgHandler, void*)(&fn));
814     addr.get.put(Msg(1, MsgType(MsgOneShot(Variant(42)))));
815 
816     actor.process(Clock.currTime);
817 
818     assert(processedIncoming);
819 }
820 
821 private void cleanupCtx(CtxT)(void* ctx)
822         if (is(CtxT == Tuple!T, T) || is(CtxT == void)) {
823     import std.traits;
824     import my.actor.typed;
825 
826     static if (!is(CtxT == void)) {
827         // trust that any use of this also pass on the correct context type.
828         auto userCtx = () @trusted { return cast(CtxT*) ctx; }();
829         // release the context such as if it holds a rc object.
830         alias Types = CtxT.Types;
831 
832         static foreach (const i; 0 .. CtxT.Types.length) {
833             {
834                 alias T = CtxT.Types[i];
835                 alias UT = Unqual!T;
836                 static if (!is(T == UT)) {
837                     static assert(!is(UT : WeakAddress),
838                             "WeakAddress must NEVER be const or immutable");
839                     static assert(!is(UT : TypedAddress!M, M...),
840                             "WeakAddress must NEVER be const or immutable: " ~ T.stringof);
841                 }
842                 // TODO: add a -version actor_ctx_diagnostic that prints when it is unable to deinit?
843 
844                 static if (is(UT == T)) {
845                     .destroy((*userCtx)[i]);
846                 }
847             }
848         }
849     }
850 }
851 
852 @("shall default initialize when possible, skipping const/immutable")
853 unittest {
854     {
855         auto x = tuple(cast(const) 42, 43);
856         alias T = typeof(x);
857         cleanupCtx!T(cast(void*)&x);
858         assert(x[0] == 42); // can't assign to const
859         assert(x[1] == 0);
860     }
861 
862     {
863         import my.path : Path;
864 
865         auto x = tuple(Path.init, cast(const) Path("foo"));
866         alias T = typeof(x);
867         cleanupCtx!T(cast(void*)&x);
868         assert(x[0] == Path.init);
869         assert(x[1] == Path("foo"));
870     }
871 }
872 
873 package struct Action {
874     Closure!(MsgHandler, void*) action;
875     ulong signature;
876 }
877 
878 /// An behavior for an actor when it receive a message of `signature`.
879 package auto makeAction(T, CtxT = void)(T handler) @safe
880         if (isFunction!T || isFunctionPointer!T) {
881     static if (is(CtxT == void))
882         alias Params = Parameters!T;
883     else {
884         alias CtxParam = Parameters!T[0];
885         alias Params = Parameters!T[1 .. $];
886         checkMatchingCtx!(CtxParam, CtxT);
887         checkRefForContext!handler;
888     }
889 
890     alias HArgs = staticMap!(Unqual, Params);
891 
892     void fn(void* ctx, ref Variant msg) @trusted {
893         static if (is(CtxT == void)) {
894             handler(msg.get!(Tuple!HArgs).expand);
895         } else {
896             auto userCtx = cast(CtxParam*) cast(CtxT*) ctx;
897             handler(*userCtx, msg.get!(Tuple!HArgs).expand);
898         }
899     }
900 
901     return Action(typeof(Action.action)(&fn, null, &cleanupCtx!CtxT), makeSignature!HArgs);
902 }
903 
904 package Closure!(ReplyHandler, void*) makeReply(T, CtxT)(T handler) @safe {
905     static if (is(CtxT == void))
906         alias Params = Parameters!T;
907     else {
908         alias CtxParam = Parameters!T[0];
909         alias Params = Parameters!T[1 .. $];
910         checkMatchingCtx!(CtxParam, CtxT);
911         checkRefForContext!handler;
912     }
913 
914     alias HArgs = staticMap!(Unqual, Params);
915 
916     void fn(void* ctx, ref Variant msg) @trusted {
917         static if (is(CtxT == void)) {
918             handler(msg.get!(Tuple!HArgs).expand);
919         } else {
920             auto userCtx = cast(CtxParam*) cast(CtxT*) ctx;
921             handler(*userCtx, msg.get!(Tuple!HArgs).expand);
922         }
923     }
924 
925     return typeof(return)(&fn, null, &cleanupCtx!CtxT);
926 }
927 
928 package struct Request {
929     Closure!(RequestHandler, void*) request;
930     ulong signature;
931 }
932 
933 private string locToString(Loc...)() {
934     import std.conv : to;
935 
936     return Loc[0] ~ ":" ~ Loc[1].to!string ~ ":" ~ Loc[2].to!string;
937 }
938 
939 /// Check that the context parameter is `ref` otherwise issue a warning.
940 package void checkRefForContext(alias handler)() {
941     import std.traits : ParameterStorageClass, ParameterStorageClassTuple;
942 
943     alias CtxParam = ParameterStorageClassTuple!(typeof(handler))[0];
944 
945     static if (CtxParam != ParameterStorageClass.ref_) {
946         pragma(msg, "INFO: handler type is " ~ typeof(handler).stringof);
947         static assert(CtxParam == ParameterStorageClass.ref_,
948                 "The context must be `ref` to avoid unnecessary copying");
949     }
950 }
951 
952 package void checkMatchingCtx(CtxParam, CtxT)() {
953     static if (!is(CtxT == CtxParam)) {
954         static assert(__traits(compiles, { auto x = CtxParam(CtxT.init.expand); }),
955                 "mismatch between the context type " ~ CtxT.stringof
956                 ~ " and the first parameter " ~ CtxParam.stringof);
957     }
958 }
959 
960 package auto makeRequest(T, CtxT = void)(T handler) @safe {
961     static assert(!is(ReturnType!T == void), "handler returns void, not allowed");
962 
963     alias RType = ReturnType!T;
964     enum isReqResult = is(RType : RequestResult!ReqT, ReqT);
965     enum isPromise = is(RType : Promise!PromT, PromT);
966 
967     static if (is(CtxT == void))
968         alias Params = Parameters!T;
969     else {
970         alias CtxParam = Parameters!T[0];
971         alias Params = Parameters!T[1 .. $];
972         checkMatchingCtx!(CtxParam, CtxT);
973         checkRefForContext!handler;
974     }
975 
976     alias HArgs = staticMap!(Unqual, Params);
977 
978     void fn(void* rawCtx, ref Variant msg, ulong replyId, WeakAddress replyTo) @trusted {
979         static if (is(CtxT == void)) {
980             auto r = handler(msg.get!(Tuple!HArgs).expand);
981         } else {
982             auto ctx = cast(CtxParam*) cast(CtxT*) rawCtx;
983             auto r = handler(*ctx, msg.get!(Tuple!HArgs).expand);
984         }
985 
986         static if (isReqResult) {
987             r.value.match!((ErrorMsg a) { sendSystemMsg(replyTo, a); }, (Promise!ReqT a) {
988                 assert(!a.data.empty, "the promise MUST be constructed before it is returned");
989                 a.data.get.replyId = replyId;
990                 a.data.get.replyTo = replyTo;
991             }, (data) {
992                 enum wrapInTuple = !is(typeof(data) : Tuple!U, U);
993                 if (auto rc = replyTo.lock.get) {
994                     static if (wrapInTuple)
995                         rc.put(Reply(replyId, Variant(tuple(data))));
996                     else
997                         rc.put(Reply(replyId, Variant(data)));
998                 }
999             });
1000         } else static if (isPromise) {
1001             r.data.get.replyId = replyId;
1002             r.data.get.replyTo = replyTo;
1003         } else {
1004             // TODO: is this syntax for U one variable or variable. I want it to be variable.
1005             enum wrapInTuple = !is(RType : Tuple!U, U);
1006             if (auto rc = replyTo.lock.get) {
1007                 static if (wrapInTuple)
1008                     rc.put(Reply(replyId, Variant(tuple(r))));
1009                 else
1010                     rc.put(Reply(replyId, Variant(r)));
1011             }
1012         }
1013     }
1014 
1015     return Request(typeof(Request.request)(&fn, null, &cleanupCtx!CtxT), makeSignature!HArgs);
1016 }
1017 
1018 @("shall link two actors lifetime")
1019 unittest {
1020     int count;
1021     void countExits(ref Actor self, ExitMsg msg) @safe nothrow {
1022         count++;
1023         self.shutdown;
1024     }
1025 
1026     auto aa1 = Actor(makeAddress2);
1027     auto a1 = build(&aa1).set((int x) {}).exitHandler_(&countExits).finalize;
1028     auto aa2 = Actor(makeAddress2);
1029     auto a2 = build(&aa2).set((int x) {}).exitHandler_(&countExits).finalize;
1030 
1031     a1.linkTo(a2.address);
1032     a1.process(Clock.currTime);
1033     a2.process(Clock.currTime);
1034 
1035     assert(a1.isAlive);
1036     assert(a2.isAlive);
1037 
1038     sendExit(a1.address, ExitReason.userShutdown);
1039     foreach (_; 0 .. 5) {
1040         a1.process(Clock.currTime);
1041         a2.process(Clock.currTime);
1042     }
1043 
1044     assert(!a1.isAlive);
1045     assert(!a2.isAlive);
1046     assert(count == 2);
1047 }
1048 
1049 @("shall let one actor monitor the lifetime of the other one")
1050 unittest {
1051     int count;
1052     void downMsg(ref Actor self, DownMsg msg) @safe nothrow {
1053         count++;
1054     }
1055 
1056     auto aa1 = Actor(makeAddress2);
1057     auto a1 = build(&aa1).set((int x) {}).downHandler_(&downMsg).finalize;
1058     auto aa2 = Actor(makeAddress2);
1059     auto a2 = build(&aa2).set((int x) {}).finalize;
1060 
1061     a1.monitor(a2.address);
1062     a1.process(Clock.currTime);
1063     a2.process(Clock.currTime);
1064 
1065     assert(a1.isAlive);
1066     assert(a2.isAlive);
1067 
1068     sendExit(a2.address, ExitReason.userShutdown);
1069     foreach (_; 0 .. 5) {
1070         a1.process(Clock.currTime);
1071         a2.process(Clock.currTime);
1072     }
1073 
1074     assert(a1.isAlive);
1075     assert(!a2.isAlive);
1076     assert(count == 1);
1077 }
1078 
1079 private struct BuildActor {
1080     Actor* actor;
1081 
1082     Actor* finalize() @safe {
1083         auto rval = actor;
1084         actor = null;
1085         return rval;
1086     }
1087 
1088     auto errorHandler(ErrorHandler a) {
1089         actor.errorHandler = a;
1090         return this;
1091     }
1092 
1093     auto downHandler_(DownHandler a) {
1094         actor.downHandler_ = a;
1095         return this;
1096     }
1097 
1098     auto exitHandler_(ExitHandler a) {
1099         actor.exitHandler_ = a;
1100         return this;
1101     }
1102 
1103     auto exceptionHandler_(ExceptionHandler a) {
1104         actor.exceptionHandler_ = a;
1105         return this;
1106     }
1107 
1108     auto defaultHandler_(DefaultHandler a) {
1109         actor.defaultHandler_ = a;
1110         return this;
1111     }
1112 
1113     auto set(BehaviorT)(BehaviorT behavior)
1114             if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT)
1115                 && !is(ReturnType!BehaviorT == void)) {
1116         auto act = makeRequest(behavior);
1117         actor.register(act.signature, act.request);
1118         return this;
1119     }
1120 
1121     auto set(BehaviorT, CT)(BehaviorT behavior, CT c)
1122             if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT)
1123                 && !is(ReturnType!BehaviorT == void)) {
1124         auto act = makeRequest!(BehaviorT, CT)(behavior);
1125         // for now just use the GC to allocate the context on.
1126         // TODO: use an allocator.
1127         act.request.ctx = cast(void*) new CT(c);
1128         actor.register(act.signature, act.request);
1129         return this;
1130     }
1131 
1132     auto set(BehaviorT)(BehaviorT behavior)
1133             if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT)
1134                 && is(ReturnType!BehaviorT == void)) {
1135         auto act = makeAction(behavior);
1136         actor.register(act.signature, act.action);
1137         return this;
1138     }
1139 
1140     auto set(BehaviorT, CT)(BehaviorT behavior, CT c)
1141             if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT)
1142                 && is(ReturnType!BehaviorT == void)) {
1143         auto act = makeAction!(BehaviorT, CT)(behavior);
1144         // for now just use the GC to allocate the context on.
1145         // TODO: use an allocator.
1146         act.action.ctx = cast(void*) new CT(c);
1147         actor.register(act.signature, act.action);
1148         return this;
1149     }
1150 }
1151 
1152 package BuildActor build(Actor* a) @safe {
1153     return BuildActor(a);
1154 }
1155 
1156 /// Implement an actor.
1157 Actor* impl(Behavior...)(Actor* self, Behavior behaviors) {
1158     import my.actor.msg : isCapture, Capture;
1159 
1160     auto bactor = build(self);
1161     static foreach (const i; 0 .. Behavior.length) {
1162         {
1163             alias b = Behavior[i];
1164 
1165             static if (!isCapture!b) {
1166                 static if (!(isFunction!(b) || isFunctionPointer!(b)))
1167                     static assert(0, "behavior may only be functions, not delgates: " ~ b.stringof);
1168 
1169                 static if (i + 1 < Behavior.length && isCapture!(Behavior[i + 1])) {
1170                     bactor.set(behaviors[i], behaviors[i + 1]);
1171                 } else
1172                     bactor.set(behaviors[i]);
1173             }
1174         }
1175     }
1176 
1177     return bactor.finalize;
1178 }
1179 
1180 @("build dynamic actor from functions")
1181 unittest {
1182     static void fn3(int s) @safe {
1183     }
1184 
1185     static string fn4(int s) @safe {
1186         return "foo";
1187     }
1188 
1189     static Tuple!(int, string) fn5(const string s) @safe {
1190         return typeof(return)(42, "hej");
1191     }
1192 
1193     auto aa1 = Actor(makeAddress2);
1194     auto a1 = build(&aa1).set(&fn3).set(&fn4).set(&fn5).finalize;
1195 }
1196 
1197 unittest {
1198     bool delayOk;
1199     static void fn1(ref Tuple!(bool*, "delayOk") c, const string s) @safe {
1200         *c.delayOk = true;
1201     }
1202 
1203     bool delayShouldNeverHappen;
1204     static void fn2(ref Tuple!(bool*, "delayShouldNeverHappen") c, int s) @safe {
1205         *c.delayShouldNeverHappen = true;
1206     }
1207 
1208     auto aa1 = Actor(makeAddress2);
1209     auto actor = build(&aa1).set(&fn1, capture(&delayOk)).set(&fn2,
1210             capture(&delayShouldNeverHappen)).finalize;
1211     delayedSend(actor.address, Clock.currTime - 1.dur!"seconds", "foo");
1212     delayedSend(actor.address, Clock.currTime + 1.dur!"hours", 42);
1213 
1214     assert(!actor.addressRef.get.empty!DelayedMsg);
1215     assert(actor.addressRef.get.empty!Msg);
1216     assert(actor.addressRef.get.empty!Reply);
1217 
1218     actor.process(Clock.currTime);
1219 
1220     assert(!actor.addressRef.get.empty!DelayedMsg);
1221     assert(actor.addressRef.get.empty!Msg);
1222     assert(actor.addressRef.get.empty!Reply);
1223 
1224     actor.process(Clock.currTime);
1225     actor.process(Clock.currTime);
1226 
1227     assert(actor.addressRef.get.empty!DelayedMsg);
1228     assert(actor.addressRef.get.empty!Msg);
1229     assert(actor.addressRef.get.empty!Reply);
1230 
1231     assert(delayOk);
1232     assert(!delayShouldNeverHappen);
1233 }
1234 
1235 @("shall process a request->then chain xyz")
1236 @system unittest {
1237     // checking capture is correctly setup/teardown by using captured rc.
1238 
1239     auto rcReq = refCounted(42);
1240     bool calledOk;
1241     static string fn(ref Tuple!(bool*, "calledOk", RefCounted!int) ctx, const string s,
1242             const string b) {
1243         assert(2 == ctx[1].refCount);
1244         if (s == "apa")
1245             *ctx.calledOk = true;
1246         return "foo";
1247     }
1248 
1249     auto rcReply = refCounted(42);
1250     bool calledReply;
1251     static void reply(ref Tuple!(bool*, RefCounted!int) ctx, const string s) {
1252         *ctx[0] = s == "foo";
1253         assert(2 == ctx[1].refCount);
1254     }
1255 
1256     auto aa1 = Actor(makeAddress2);
1257     auto actor = build(&aa1).set(&fn, capture(&calledOk, rcReq)).finalize;
1258 
1259     assert(2 == rcReq.refCount);
1260     assert(1 == rcReply.refCount);
1261 
1262     actor.request(actor.address, infTimeout).send("apa", "foo")
1263         .capture(&calledReply, rcReply).then(&reply);
1264     assert(2 == rcReply.refCount);
1265 
1266     assert(!actor.addr.get.empty!Msg);
1267     assert(actor.addr.get.empty!Reply);
1268 
1269     actor.process(Clock.currTime);
1270     assert(actor.addr.get.empty!Msg);
1271     assert(actor.addr.get.empty!Reply);
1272 
1273     assert(2 == rcReq.refCount);
1274     assert(1 == rcReply.refCount, "after the message is consumed the refcount should go back");
1275 
1276     assert(calledOk);
1277     assert(calledReply);
1278 
1279     actor.shutdown;
1280     while (actor.isAlive)
1281         actor.process(Clock.currTime);
1282 }
1283 
1284 @("shall process a request->then chain using promises")
1285 unittest {
1286     static struct A {
1287         string v;
1288     }
1289 
1290     static struct B {
1291         string v;
1292     }
1293 
1294     int calledOk;
1295     auto fn1p = makePromise!string;
1296     static RequestResult!string fn1(ref Capture!(int*, "calledOk", Promise!string, "p") c, A a) @trusted {
1297         if (a.v == "apa")
1298             (*c.calledOk)++;
1299         return typeof(return)(c.p);
1300     }
1301 
1302     auto fn2p = makePromise!string;
1303     static Promise!string fn2(ref Capture!(int*, "calledOk", Promise!string, "p") c, B a) {
1304         (*c.calledOk)++;
1305         return c.p;
1306     }
1307 
1308     int calledReply;
1309     static void reply(ref Tuple!(int*) ctx, const string s) {
1310         if (s == "foo")
1311             *ctx[0] += 1;
1312     }
1313 
1314     auto aa1 = Actor(makeAddress2);
1315     auto actor = build(&aa1).set(&fn1, capture(&calledOk, fn1p)).set(&fn2,
1316             capture(&calledOk, fn2p)).finalize;
1317 
1318     actor.request(actor.address, infTimeout).send(A("apa")).capture(&calledReply).then(&reply);
1319     actor.request(actor.address, infTimeout).send(B("apa")).capture(&calledReply).then(&reply);
1320 
1321     actor.process(Clock.currTime);
1322     assert(calledOk == 1); // first request
1323     assert(calledReply == 0);
1324 
1325     fn1p.deliver("foo");
1326 
1327     assert(calledReply == 0);
1328 
1329     actor.process(Clock.currTime);
1330     assert(calledOk == 2); // second request triggered
1331     assert(calledReply == 1);
1332 
1333     fn2p.deliver("foo");
1334     actor.process(Clock.currTime);
1335 
1336     assert(calledReply == 2);
1337 
1338     actor.shutdown;
1339     while (actor.isAlive) {
1340         actor.process(Clock.currTime);
1341     }
1342 }
1343 
1344 /// The timeout triggered.
1345 class ScopedActorException : Exception {
1346     this(ScopedActorError err, string file = __FILE__, int line = __LINE__) @safe pure nothrow {
1347         super(null, file, line);
1348         error = err;
1349     }
1350 
1351     ScopedActorError error;
1352 }
1353 
1354 enum ScopedActorError : ubyte {
1355     none,
1356     // actor address is down
1357     down,
1358     // request timeout
1359     timeout,
1360     // the address where unable to process the received message
1361     unknownMsg,
1362     // some type of fatal error occured.
1363     fatal,
1364 }
1365 
1366 /** Intended to be used in a local scope by a user.
1367  *
1368  * `ScopedActor` is not thread safe.
1369  */
1370 struct ScopedActor {
1371     import my.actor.typed : underlyingAddress, underlyingWeakAddress;
1372 
1373     private {
1374         static struct Data {
1375             Actor self;
1376             ScopedActorError errSt;
1377 
1378             ~this() @safe {
1379                 if (self.addr.empty)
1380                     return;
1381 
1382                 () @trusted {
1383                     self.downHandler = null;
1384                     self.defaultHandler = toDelegate(&.defaultHandler);
1385                     self.errorHandler = toDelegate(&defaultErrorHandler);
1386                 }();
1387 
1388                 self.shutdown;
1389                 while (self.isAlive) {
1390                     self.process(Clock.currTime);
1391                 }
1392             }
1393         }
1394 
1395         RefCounted!Data data;
1396     }
1397 
1398     this(StrongAddress addr, string name) @safe {
1399         data = refCounted(Data(Actor(addr)));
1400         data.get.self.name = name;
1401     }
1402 
1403     private void reset() @safe nothrow {
1404         data.get.errSt = ScopedActorError.none;
1405     }
1406 
1407     SRequestSend request(TAddress)(TAddress requestTo, SysTime timeout)
1408             if (isAddress!TAddress) {
1409         reset;
1410         auto rs = .request(&data.get.self, underlyingWeakAddress(requestTo), timeout);
1411         return SRequestSend(rs, this);
1412     }
1413 
1414     private static struct SRequestSend {
1415         RequestSend rs;
1416         ScopedActor self;
1417 
1418         /// Copy constructor
1419         this(ref return typeof(this) rhs) @safe pure nothrow @nogc {
1420             rs = rhs.rs;
1421             self = rhs.self;
1422         }
1423 
1424         @disable this(this);
1425 
1426         SRequestSendThen send(Args...)(auto ref Args args) {
1427             return SRequestSendThen(.send(rs, args), self);
1428         }
1429     }
1430 
1431     private static struct SRequestSendThen {
1432         RequestSendThen rs;
1433         ScopedActor self;
1434         uint backoff;
1435 
1436         /// Copy constructor
1437         this(ref return typeof(this) rhs) {
1438             rs = rhs.rs;
1439             self = rhs.self;
1440             backoff = rhs.backoff;
1441         }
1442 
1443         @disable this(this);
1444 
1445         void dynIntervalSleep() @trusted {
1446             // +100 usecs "feels good", magic number. current OS and
1447             // implementation of message passing isn't that much faster than
1448             // 100us. A bit slow behavior, ehum, for a scoped actor is OK. They
1449             // aren't expected to be used for "time critical" sections.
1450             Thread.sleep(backoff.dur!"usecs");
1451             backoff = min(backoff + 100, 20000);
1452         }
1453 
1454         private static struct ValueCapture {
1455             RefCounted!Data data;
1456 
1457             void downHandler(ref Actor, DownMsg) @safe nothrow {
1458                 data.get.errSt = ScopedActorError.down;
1459             }
1460 
1461             void errorHandler(ref Actor, ErrorMsg msg) @safe nothrow {
1462                 if (msg.reason == SystemError.requestTimeout)
1463                     data.get.errSt = ScopedActorError.timeout;
1464                 else
1465                     data.get.errSt = ScopedActorError.fatal;
1466             }
1467 
1468             void unknownMsgHandler(ref Actor a, ref Variant msg) @safe nothrow {
1469                 logAndDropHandler(a, msg);
1470                 data.get.errSt = ScopedActorError.unknownMsg;
1471             }
1472         }
1473 
1474         void then(T)(T handler, ErrorHandler onError = null) {
1475             scope (exit)
1476                 demonitor(rs.rs.self, rs.rs.requestTo);
1477             monitor(rs.rs.self, rs.rs.requestTo);
1478 
1479             auto callback = new ValueCapture(self.data);
1480             self.data.get.self.downHandler = &callback.downHandler;
1481             self.data.get.self.defaultHandler = &callback.unknownMsgHandler;
1482             self.data.get.self.errorHandler = &callback.errorHandler;
1483 
1484             () @trusted { .thenUnsafe!(T, void)(rs, handler, null, onError); }();
1485 
1486             scope (exit)
1487                 () @trusted {
1488                 self.data.get.self.downHandler = null;
1489                 self.data.get.self.defaultHandler = toDelegate(&.defaultHandler);
1490                 self.data.get.self.errorHandler = toDelegate(&defaultErrorHandler);
1491             }();
1492 
1493             auto requestTo = rs.rs.requestTo.lock;
1494             if (!requestTo)
1495                 throw new ScopedActorException(ScopedActorError.down);
1496 
1497             // TODO: this loop is stupid... should use a conditional variable
1498             // instead but that requires changing the mailbox. later
1499             do {
1500                 rs.rs.self.process(Clock.currTime);
1501                 // force the actor to be alive even though there are no behaviors.
1502                 rs.rs.self.state_ = ActorState.waiting;
1503 
1504                 if (self.data.get.errSt == ScopedActorError.none) {
1505                     dynIntervalSleep;
1506                 } else {
1507                     throw new ScopedActorException(self.data.get.errSt);
1508                 }
1509 
1510             }
1511             while (self.data.get.self.waitingForReply);
1512         }
1513     }
1514 }
1515 
1516 ScopedActor scopedActor(string file = __FILE__, uint line = __LINE__)() @safe {
1517     import std.format : format;
1518 
1519     return ScopedActor(makeAddress2, format!"ScopedActor.%s:%s"(file, line));
1520 }
1521 
1522 @(
1523         "scoped actor shall throw an exception if the actor that is sent a request terminates or is closed")
1524 unittest {
1525     import my.actor.system;
1526 
1527     auto sys = makeSystem;
1528 
1529     auto a0 = sys.spawn((Actor* self) {
1530         return impl(self, (ref CSelf!() ctx, int x) {
1531             Thread.sleep(50.dur!"msecs");
1532             return 42;
1533         }, capture(self), (ref CSelf!() ctx, double x) {}, capture(self),
1534             (ref CSelf!() ctx, string x) { ctx.self.shutdown; return 42; }, capture(self));
1535     });
1536 
1537     {
1538         auto self = scopedActor;
1539         bool excThrown;
1540         auto stopAt = Clock.currTime + 3.dur!"seconds";
1541         while (!excThrown && Clock.currTime < stopAt) {
1542             try {
1543                 self.request(a0, delay(1.dur!"nsecs")).send(42).then((int x) {});
1544             } catch (ScopedActorException e) {
1545                 excThrown = e.error == ScopedActorError.timeout;
1546             } catch (Exception e) {
1547                 logger.info(e.msg);
1548             }
1549         }
1550         assert(excThrown, "timeout did not trigger as expected");
1551     }
1552 
1553     {
1554         auto self = scopedActor;
1555         bool excThrown;
1556         auto stopAt = Clock.currTime + 3.dur!"seconds";
1557         while (!excThrown && Clock.currTime < stopAt) {
1558             try {
1559                 self.request(a0, delay(1.dur!"seconds")).send("hello").then((int x) {
1560                 });
1561             } catch (ScopedActorException e) {
1562                 excThrown = e.error == ScopedActorError.down;
1563             } catch (Exception e) {
1564                 logger.info(e.msg);
1565             }
1566         }
1567         assert(excThrown, "detecting terminated actor did not trigger as expected");
1568     }
1569 }